package org.hscoder.springboot.hbase;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.List;

@Service
public class DeviceStateService {

    public static final Logger logger = LoggerFactory.getLogger(DeviceStateService.class);

    @Autowired
    private HbaseTemplate hbaseTemplate;
    @Autowired
    private Configuration configuration;

    public static final String TABLE = "DeviceState";
    public static final String CF_NAME = "name";
    public static final String CF_STATE = "state";

    public static final String QUALIFIER_NAME = "c1";
    public static final String QUALIFIER_STATE = "c2";

    /**
     * 初始化创建表
     */
    @PostConstruct
    public void createTable() {
        //初始化表
        HBaseAdmin admin = null;
        try {
            admin = new HBaseAdmin(configuration);
            TableName tableName = TableName.valueOf(TABLE);

            //已经存在
            if (admin.tableExists(tableName)) {
                logger.warn("table:{} exists!", tableName.getName());
            } else {

                //创建列
                HTableDescriptor tableDesc = new HTableDescriptor(tableName);
                tableDesc.addFamily(new HColumnDescriptor(CF_NAME));
                tableDesc.addFamily(new HColumnDescriptor(CF_STATE));

                admin.createTable(tableDesc);
                logger.info("create table:{} success!", tableName.getName());
            }
        } catch (IOException e) {

            throw new RuntimeException(e);
        } finally {
            if (admin != null) {
                try {
                    admin.close();
                } catch (IOException e) {
                    logger.error("error occurs.", e);
                }
            }
        }
    }

    /**
     * 添加一条记录
     *
     * @param rowkey
     * @param name
     * @param state
     */
    public void add(String rowkey, String name, String state) {
        hbaseTemplate.put(TABLE, rowkey, CF_NAME, QUALIFIER_NAME, Bytes.toBytes(name));
        hbaseTemplate.put(TABLE, rowkey, CF_STATE, QUALIFIER_STATE, Bytes.toBytes(state));
    }

    /**
     * 查询单条记录
     *
     * @param rowkey
     * @return
     */
    public DeviceState get(String rowkey) {
        return hbaseTemplate.get(TABLE, rowkey, new DeviceStateRowMapper());
    }

    /**
     * 按条件查询
     *
     * @param prefix
     * @param rowStart
     * @param rowEnd
     * @param max
     * @return
     */
    public List<DeviceState> find(String prefix, String rowStart, String rowEnd, int max) {
        Scan scan = new Scan();
        if (!StringUtils.isEmpty(prefix)) {
            scan.setFilter(new PrefixFilter(Bytes.toBytes(prefix)));
        }
        if (!StringUtils.isEmpty(rowStart)) {
            scan.setStartRow(Bytes.toBytes(rowStart));
        }
        if (!StringUtils.isEmpty(rowEnd)) {
            scan.setStopRow(Bytes.toBytes(rowEnd));
        }
        scan.setLimit(max);
        return hbaseTemplate.find(TABLE, scan, new DeviceStateRowMapper());
    }

    /**
     * 删除行
     *
     * @param rowkey
     */
    public void delete(String rowkey) {
        hbaseTemplate.delete(TABLE, rowkey, CF_NAME);
        hbaseTemplate.delete(TABLE, rowkey, CF_STATE);
    }

    /**
     * 删除表
     */
    public void dropTable() {
        HBaseAdmin admin = null;
        try {
            admin = new HBaseAdmin(configuration);
            TableName tableName = TableName.valueOf(TABLE);

            //已经存在
            if (admin.tableExists(tableName)) {
                admin.deleteTable(tableName);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (admin != null) {
                try {
                    admin.close();
                } catch (IOException e) {
                    logger.error("error occurs.", e);
                }
            }
        }
    }

    /**
     * 行RowMapper实现
     */
    public static class DeviceStateRowMapper implements RowMapper<DeviceState> {
        @Override
        public DeviceState mapRow(Result result, int rowNum) throws Exception {
            String name = Bytes.toString(result.getValue(Bytes.toBytes(CF_NAME), Bytes.toBytes(QUALIFIER_NAME)));
            String state = Bytes.toString(result.getValue(Bytes.toBytes(CF_STATE), Bytes.toBytes(QUALIFIER_STATE)));
            String id = Bytes.toString(result.getRow());
            return new DeviceState(id, name, state);
        }
    }

}
